package com.demo.join;

import com.demo.bean.OrderEvent;
import com.demo.bean.ReceiptEvent;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.CoProcessFunction;
import org.apache.flink.streaming.api.functions.co.ProcessJoinFunction;
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;

import java.text.SimpleDateFormat;
import java.util.Date;

/**
 * Flink系列 - 实时数仓之电商订单支付实时对账（六）
 *
 * @link https://www.jianshu.com/p/e1cca7ca63d6
 */
public class PaymentOrderCheckIn {
    private static final OutputTag<OrderEvent> unmatchedPayEventOutputTag = new OutputTag<OrderEvent>("unmatched-pay") {
    };
    private static final OutputTag<ReceiptEvent> unmatchedReceiptEventOutputTag = new OutputTag<ReceiptEvent>("unmatched-receipt") {
    };

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        env.setParallelism(1);

        // 1. 读取订单事件数据
        DataStream<String> inputOrderStream = env.readTextFile(PaymentOrderCheckIn.class.getResource("/OrderEvent.csv").getPath());
        KeyedStream<OrderEvent, String> orderDataStream = inputOrderStream.map(new MapFunction<String, OrderEvent>() {
            @Override
            public OrderEvent map(String s) throws Exception {
                String[] dataArray = s.split(",");
                //用户ID，订单状态，订单ID，时间戳
                return new OrderEvent(Long.parseLong(dataArray[0]), dataArray[1], dataArray[2], Long.parseLong(dataArray[3]));
            }
            //乱序1s 区间
        }).assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<OrderEvent>(Time.seconds(1)) {
            @Override
            public long extractTimestamp(OrderEvent element) {
                return element.getTimestamp() * 1000L;
            }
        }).filter(order -> order.getAction().equals("pay"))
                .keyBy(order -> order.getOrId());

        // 2. 读取到账事件数据
        DataStream<String> inputReceipStream = env.readTextFile(PaymentOrderCheckIn.class.getResource("/ReceiptEvent.csv").getPath());
        KeyedStream<ReceiptEvent, String> receipDataStream = inputReceipStream.map(new MapFunction<String, ReceiptEvent>() {
            @Override
            public ReceiptEvent map(String s) throws Exception {
                String[] dataArray = s.split(",");
                //订单ID，支付平台类型，时间戳
                return new ReceiptEvent(dataArray[0], dataArray[1], Long.parseLong(dataArray[2]));
            }
            //乱序1s 区间
        }).assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<ReceiptEvent>(Time.seconds(1)) {
            @Override
            public long extractTimestamp(ReceiptEvent element) {
                return element.getTimestamp() * 1000L;
            }
        }).keyBy(order -> order.getOrId());

        //通过 intervalJoin来实现双流join
        intervalJoinImpl(orderDataStream, receipDataStream);

        //process function实现
        //合并两条流，进行处理
        processImpl(orderDataStream, receipDataStream);


        env.execute("tx match with join job");
    }

    public static void processImpl(KeyedStream<OrderEvent, String> orderDataStream, KeyedStream<ReceiptEvent, String> receipDataStream) {
        SingleOutputStreamOperator resultStream = orderDataStream.connect(receipDataStream)
                .process(new OrderMatchFunction());
        resultStream.print("matched");
        resultStream.getSideOutput(unmatchedPayEventOutputTag).print("unmatched pays");
        resultStream.getSideOutput(unmatchedReceiptEventOutputTag).print("unmatched receipts");
    }

    public static void intervalJoinImpl(KeyedStream<OrderEvent, String> orderDataStream, KeyedStream<ReceiptEvent, String> receipDataStream) {
        // -------------------------------关联处理-------------------------------------------------
        DataStream resultStream = orderDataStream.intervalJoin(receipDataStream)  //这里使用相对关联
                .between(Time.seconds(-3), Time.seconds(5))  // 订单数据等待到账数据时间前三秒到后三秒区间
                .process(new OrderMatchWithJoinFunction());  // 自定义类输出服务上边条件的数据
        // ---------------------------------------------------------------------------------------
        /**
         * (OrderLog{userId=34729, action='pay', orId='sd76f87d6', timestamp=1970-01-19 08:53:50},ReceiptLog{orId='sd76f87d6', payEquipment='wechat', timestamp=1970-01-19 08:53:50})
         * (OrderLog{userId=34730, action='pay', orId='3hu3k2432', timestamp=1970-01-19 08:53:50},ReceiptLog{orId='3hu3k2432', payEquipment='alipay', timestamp=1970-01-19 08:53:50})
         * (OrderLog{userId=34746, action='pay', orId='3243hr9h9', timestamp=1970-01-19 08:53:50},ReceiptLog{orId='3243hr9h9', payEquipment='wechat', timestamp=1970-01-19 08:53:50})
         * (OrderLog{userId=34747, action='pay', orId='329d09f9f', timestamp=1970-01-19 08:53:50},ReceiptLog{orId='329d09f9f', payEquipment='alipay', timestamp=1970-01-19 08:53:50})
         *  缺陷是：只能输出支付成功的。实现符合需求的数据输出，不能输出不符合的数据。
         */
        resultStream.print("intervalJoinImpl");
    }

    /**
     * TableAPI 实现双流合并对账, 输出：
     * <p>
     */
    public static class OrderMatchWithJoinFunction extends ProcessJoinFunction<OrderEvent, ReceiptEvent, Tuple2<OrderEvent, ReceiptEvent>> {
        @Override
        public void processElement(OrderEvent left, ReceiptEvent right, Context ctx, Collector<Tuple2<OrderEvent, ReceiptEvent>> out) throws Exception {
            out.collect(new Tuple2<>(left, right));
        }
    }

    /**
     * CoProcessFunction-process 处理
     */
    public static class OrderMatchFunction extends CoProcessFunction<OrderEvent, ReceiptEvent, Tuple2<OrderEvent, ReceiptEvent>> {
        // 定义状态，保存当前交易对应的订单支付事件和到账事件
        transient ValueState<OrderEvent> payEventState = null;
        transient ValueState<ReceiptEvent> receiptEventState = null;

        @Override
        public void open(Configuration parameters) throws Exception {
            System.out.println("open~~~");
            super.open(parameters);
            payEventState = getRuntimeContext().getState(new ValueStateDescriptor<OrderEvent>("pay", OrderEvent.class));
            receiptEventState = getRuntimeContext().getState(new ValueStateDescriptor<ReceiptEvent>("receipt", TypeInformation.of(ReceiptEvent.class)));
        }


        @Override
        public void processElement1(OrderEvent orderEvent, Context context, Collector<Tuple2<OrderEvent, ReceiptEvent>> collector) throws Exception {
            // 订单支付来了，要判断之前是否有到账事件
            System.out.println("==processElement1=>" + orderEvent);
            ReceiptEvent receipt = receiptEventState.value();
            if (receipt != null) {
                // 如果已经有receipt，正常输出匹配，清空状态
                collector.collect(new Tuple2(orderEvent, receipt));
                receiptEventState.clear();
                payEventState.clear();
            } else {
                // 如果还没来，注册定时器开始等待5秒
                context.timerService().registerEventTimeTimer(orderEvent.getTimestamp() * 1000L + 5000L);
                // 更新状态
                payEventState.update(orderEvent);
            }
        }

        @Override
        public void processElement2(ReceiptEvent receiptEvent, Context context, Collector<Tuple2<OrderEvent, ReceiptEvent>> collector) throws Exception {
            // 到账事件来了，要判断之前是否有pay事件
            OrderEvent pay = payEventState.value();
            if (pay != null) {
                // 如果已经有pay，正常输出匹配，清空状态
                collector.collect(new Tuple2(pay, receiptEvent));
                receiptEventState.clear();
                payEventState.clear();
            } else {
                // 如果还没来，注册定时器开始等待3秒
                context.timerService().registerEventTimeTimer(receiptEvent.getTimestamp() * 1000L + 3000L);
                // 更新状态
                receiptEventState.update(receiptEvent);
            }
        }

        @Override
        public void onTimer(long timestamp, OnTimerContext ctx, Collector<Tuple2<OrderEvent, ReceiptEvent>> out) throws Exception {
            super.onTimer(timestamp, ctx, out);
            System.out.println("==onTimer==>" + new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date(timestamp)));
            // 定时器触发，判断状态中哪个还存在，就代表另一个没来，输出到侧输出流
            if (payEventState.value() != null) {
                ctx.output(unmatchedPayEventOutputTag, payEventState.value());
            }
            if (receiptEventState.value() != null) {
                ctx.output(unmatchedReceiptEventOutputTag, receiptEventState.value());
            }
            // 清空状态
            receiptEventState.clear();
            payEventState.clear();
        }
    }
}
